@@ -0,0 +1,55 @@ |
||
| 1 |
+module Agents |
|
| 2 |
+ class DigestAgent < Agent |
|
| 3 |
+ include FormConfigurable |
|
| 4 |
+ |
|
| 5 |
+ default_schedule "6am" |
|
| 6 |
+ |
|
| 7 |
+ description <<-MD |
|
| 8 |
+ The Digest Agent collects any Events sent to it and emits them as a single event. |
|
| 9 |
+ |
|
| 10 |
+ The resulting Event will have a payload message of `message`. You can use liquid templating in the `message`, have a look at the [Wiki](https://github.com/cantino/huginn/wiki/Formatting-Events-using-Liquid) for details. |
|
| 11 |
+ |
|
| 12 |
+ Set `expected_receive_period_in_days` to the maximum amount of time that you'd expect to pass between Events being received by this Agent. |
|
| 13 |
+ MD |
|
| 14 |
+ |
|
| 15 |
+ event_description <<-MD |
|
| 16 |
+ Events look like this: |
|
| 17 |
+ |
|
| 18 |
+ {
|
|
| 19 |
+ "events": [ event list ], |
|
| 20 |
+ "message": "Your message" |
|
| 21 |
+ } |
|
| 22 |
+ MD |
|
| 23 |
+ |
|
| 24 |
+ def default_options |
|
| 25 |
+ {
|
|
| 26 |
+ "expected_receive_period_in_days" => "2", |
|
| 27 |
+ "message" => "{{ events | map: 'message' | join: ',' }}"
|
|
| 28 |
+ } |
|
| 29 |
+ end |
|
| 30 |
+ |
|
| 31 |
+ form_configurable :message, type: :text |
|
| 32 |
+ form_configurable :expected_receive_period_in_days |
|
| 33 |
+ |
|
| 34 |
+ def working? |
|
| 35 |
+ last_receive_at && last_receive_at > interpolated["expected_receive_period_in_days"].to_i.days.ago && !recent_error_logs? |
|
| 36 |
+ end |
|
| 37 |
+ |
|
| 38 |
+ def receive(incoming_events) |
|
| 39 |
+ self.memory["queue"] ||= [] |
|
| 40 |
+ incoming_events.each do |event| |
|
| 41 |
+ self.memory["queue"] << event.id |
|
| 42 |
+ end |
|
| 43 |
+ end |
|
| 44 |
+ |
|
| 45 |
+ def check |
|
| 46 |
+ if self.memory["queue"] && self.memory["queue"].length > 0 |
|
| 47 |
+ events = received_events.where(id: self.memory["queue"]).order(id: :asc).to_a |
|
| 48 |
+ payload = { "events" => events.map { |event| event.payload } }
|
|
| 49 |
+ payload["message"] = interpolated(payload)["message"] |
|
| 50 |
+ create_event :payload => payload |
|
| 51 |
+ self.memory["queue"] = [] |
|
| 52 |
+ end |
|
| 53 |
+ end |
|
| 54 |
+ end |
|
| 55 |
+end |
@@ -0,0 +1,68 @@ |
||
| 1 |
+require "rails_helper" |
|
| 2 |
+ |
|
| 3 |
+describe Agents::DigestAgent do |
|
| 4 |
+ before do |
|
| 5 |
+ @checker = Agents::DigestAgent.new(:name => "something", :options => { :expected_receive_period_in_days => "2", :message => "{{ events | map:'data' | join:';' }}" })
|
|
| 6 |
+ @checker.user = users(:bob) |
|
| 7 |
+ @checker.save! |
|
| 8 |
+ end |
|
| 9 |
+ |
|
| 10 |
+ describe "#working?" do |
|
| 11 |
+ it "checks to see if the Agent has received any events in the last 'expected_receive_period_in_days' days" do |
|
| 12 |
+ event = Event.new |
|
| 13 |
+ event.agent = agents(:bob_rain_notifier_agent) |
|
| 14 |
+ event.payload = { :data => "event" }
|
|
| 15 |
+ event.save! |
|
| 16 |
+ |
|
| 17 |
+ expect(@checker).not_to be_working # no events have ever been received |
|
| 18 |
+ Agents::DigestAgent.async_receive(@checker.id, [event.id]) |
|
| 19 |
+ expect(@checker.reload).to be_working # Events received |
|
| 20 |
+ three_days_from_now = 3.days.from_now |
|
| 21 |
+ stub(Time).now { three_days_from_now }
|
|
| 22 |
+ expect(@checker.reload).not_to be_working # too much time has passed |
|
| 23 |
+ end |
|
| 24 |
+ end |
|
| 25 |
+ |
|
| 26 |
+ describe "#receive" do |
|
| 27 |
+ it "queues any payloads it receives" do |
|
| 28 |
+ event1 = Event.new |
|
| 29 |
+ event1.agent = agents(:bob_rain_notifier_agent) |
|
| 30 |
+ event1.payload = { :data => "event1" }
|
|
| 31 |
+ event1.save! |
|
| 32 |
+ |
|
| 33 |
+ event2 = Event.new |
|
| 34 |
+ event2.agent = agents(:bob_weather_agent) |
|
| 35 |
+ event2.payload = { :data => "event2" }
|
|
| 36 |
+ event2.save! |
|
| 37 |
+ |
|
| 38 |
+ Agents::DigestAgent.async_receive(@checker.id, [event1.id, event2.id]) |
|
| 39 |
+ expect(@checker.reload.memory[:queue]).to eq([event1.id, event2.id]) |
|
| 40 |
+ end |
|
| 41 |
+ end |
|
| 42 |
+ |
|
| 43 |
+ describe "#check" do |
|
| 44 |
+ it "should emit a event" do |
|
| 45 |
+ expect { Agents::DigestAgent.async_check(@checker.id) }.not_to change { Event.count }
|
|
| 46 |
+ |
|
| 47 |
+ event1 = Event.new |
|
| 48 |
+ event1.agent = agents(:bob_rain_notifier_agent) |
|
| 49 |
+ event1.payload = { :data => "event" }
|
|
| 50 |
+ event1.save! |
|
| 51 |
+ |
|
| 52 |
+ event2 = Event.new |
|
| 53 |
+ event2.agent = agents(:bob_weather_agent) |
|
| 54 |
+ event2.payload = { :data => "event" }
|
|
| 55 |
+ event2.save! |
|
| 56 |
+ |
|
| 57 |
+ Agents::DigestAgent.async_receive(@checker.id, [event1.id, event2.id]) |
|
| 58 |
+ @checker.sources << agents(:bob_rain_notifier_agent) << agents(:bob_weather_agent) |
|
| 59 |
+ @checker.save! |
|
| 60 |
+ |
|
| 61 |
+ expect { Agents::DigestAgent.async_check(@checker.id) }.to change { Event.count }.by(1)
|
|
| 62 |
+ @checker.reload |
|
| 63 |
+ expect(@checker.most_recent_event.payload["events"]).to eq([event1.payload, event2.payload]) |
|
| 64 |
+ expect(@checker.most_recent_event.payload["message"]).to eq("event;event")
|
|
| 65 |
+ expect(@checker.memory[:queue]).to be_empty |
|
| 66 |
+ end |
|
| 67 |
+ end |
|
| 68 |
+end |